-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bug: Fix memory reservation and allocation problems for SortExec #14644
bug: Fix memory reservation and allocation problems for SortExec #14644
Conversation
Fix batch memory consumption growth after sorting; Reserve memory more aggressively to compensate for memory needed for merging.
2f7f403
to
8cc9aea
Compare
let value = self.sort.expr.evaluate(batch)?; | ||
let array = value.into_array(batch.num_rows())?; | ||
let size_in_mem = array.get_buffer_memory_size(); | ||
let array = array.as_any().downcast_ref::<T>().expect("field values"); | ||
Ok(ArrayValues::new(self.sort.options, array)) | ||
let mut array_reservation = self.reservation.new_empty(); | ||
array_reservation.try_grow(size_in_mem)?; | ||
Ok(ArrayValues::new( | ||
self.sort.options, | ||
array, | ||
array_reservation, | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this reservation is needed.
When the sort expression is a simple column reference, the array
simply reuses the buffer in batch
, this is the case where reservation is not needed. However, the sort expression can be a complex expression such as l_linenumber + 1
, the result of evaluation takes additional space in this case. Always reserving array
is a more conservative approach that prevents allocations from overshooting the limit.
.with_reservation(self.reservation.new_empty()) | ||
.with_reservation(self.merge_reservation.new_empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should use merge_reservation
here, because the allocations happening in the stream built here are for merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Kontinuation this looks incredibly helpful
cc @tustvold @Dandandan @crepererum
@andygrove cc as this ticket directly related on Comet working on cloud instances |
// write sorted batches to disk when the memory is insufficient. | ||
let mut spill_writer: Option<IPCWriter> = None; | ||
// Leave at least 1/3 of spill reservation for sort/merge the next batch. Here the | ||
// 1/3 is simply an arbitrary chosen number. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking should we have this number configurable as datafusion parameter? feeling depending on data this number can fluctuate? It might be different for short and plain data vs deeply nested wide rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that SortPreservingMergeStream
will never reserve memory after producing the first merged batch when merging in-memory batches, so there's no need to reserve additional space for merging in this place. I have removed the additional memory reservation in a new commit.
Yes, this feature is quite bug-prone, perhaps we should mark it as experimental to prevent someone to use it in production. Thank you so much for the efforts. Here are my thoughts on the changes
The 2X memory problem is: specifying a query to run under 100M memory, the measured physical memory is 200M. (though the reality is even worse than 2X 🤦🏼 , see #14142)
I can get the high-level idea of the more conservative memory accounting in point 3, it is used to also account for merged batches, but I get lost in the memory estimation details in the implementation (especially is this 2X estimation amplification only for merged batches, or also intermediate
I think the buffer resizing mechanism is not doubling each time, the default policy will allocate new constant size buffers https://docs.rs/arrow-array/54.1.0/src/arrow_array/builder/generic_bytes_view_builder.rs.html#120-122, so this change might not help |
Actually it helps. I have added a new test case Here is where the 2X buffer growth come from:
|
The 2X amplification is mainly for intermediate Let's assume that each batch is 10 MB, and we have 100 MB memory budget. The following diagram shows how the memory consumption become 100MB when performing merging. Here is the detailed explanation:
|
BTW, the 2X amplification for intermediate
This can be worked around by configuring a larger |
I had another interesting observation: spilling sort can be faster than memory unbounded sort in datafusion. I tried running sort-tpch Q3 using this PR with #14642 cherry-picked onto it, and configured
When running without memory limit, we are merging tons of small sorted streams, this seems to be bad for performance. Memory limit enforces us to do merging before ingesting all the batches, so we are doing several smaller merges first and do a final merge at last to produce the result set. Coalescing batches into larger streams before merging seems to be a good idea. |
Not directly related to the point of this PR but regarding I hope this apache/datafusion-comet#1369 helps. This may cause more spilling but I expect the chance of OOM is reduced. Still accurate tracking of memory is important, otherwise mis-tracking still causes OOM |
use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use futures::TryStreamExt;
use std::sync::Arc;
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
pub async fn main() {
build_parquet();
let env = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::default())
.with_memory_pool(Arc::new(FairSpillPool::new(100 * 1024 * 1024)))
.build_arc()
.unwrap();
let mut config = SessionConfig::new().with_sort_spill_reservation_bytes(32 * 1024 * 1024);
config.options_mut().execution.parquet.schema_force_view_types = false;
let ctx = SessionContext::new_with_config_rt(config, env);
ctx.register_parquet(
"big_strings",
"/tmp/big_strings.parquet",
ParquetReadOptions::default(),
)
.await
.unwrap();
let sql = "SELECT * FROM big_strings ORDER BY strings";
println!("Sorting strings");
ctx.sql(sql)
.await
.unwrap()
.execute_stream()
.await
.unwrap()
.try_for_each(|_| std::future::ready(Ok(())))
.await
.unwrap();
}
fn build_parquet() {
if std::fs::File::open("/tmp/big_strings.parquet").is_ok() {
println!("Using existing file at /tmp/big_strings.parquet");
return;
}
println!("Generating test file at /tmp/big_strings.parquet");
let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"strings",
DataType::Utf8,
false,
)]));
let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();
for batch_idx in 0..100 {
println!("Generating batch {} of 100", batch_idx);
let mut string_array_builder =
StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14);
for i in 0..(1024 * 1024) {
string_array_builder
.append_value(format!("string-{}string-{}string-{}", i, i, i));
}
let array = Arc::new(string_array_builder.finish());
let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();
} called `Result::unwrap()` on an `Err` value: ResourcesExhausted("Failed to allocate additional 353536 bytes for ExternalSorterMerge[1] with 22948928 bytes already allocated for this reservation - 127190 bytes remain available for the total pool") Thank you @kazuyukitanimura for the PR, i applied the PR try to fix the testing, but the above testing is still failed for me, i am not sure if i am missing something. |
There are 2 problems:
One possible fix for problem 2 is to use a smaller batch size when writing batches to spill files, so that the unspillable memory required for the final spill-read merging will be smaller. Or we simply leave this problem as is and requires the user to raise the memory limit. |
Thank you @Kontinuation for good explain, it makes sense to me, i will try it. And for the problem 2, is it possible we introduce a spillable merging phase, will it be more safe? |
self.sort_or_spill_in_mem_batches().await?; | ||
// We've already freed more than half of reserved memory, | ||
// so we can grow the reservation again. There's nothing we can do | ||
// if this try_grow fails. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reserves more memory for ingested batches to leave some room for merging. This PR reserves 2X memory for each batch, this works for most of the queries in sort-tpch benchmark (all except Q5 and Q8). User still have to configure sort_spill_reservation_bytes when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.
Is it possible to make the merging phase also spillable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we can add more dedicated discussions around this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.
I agree we should implement such a feature as a dedicated, follow on PR / Project.
@zhuqi-lucas or @Kontinuation is there one of you can file a ticket to track the work. I think especially highlighting what cases the current code won't work well is important.
It sounds like there are several issues:
- ensuring we can always spill data (now spilling will sometimes fail if we run out of memory to sort the batches in)
- Ensuring that we can always merge the data that was spilled, even if it had a really wide fanout (like 1000 of spill files)
I think problem 1 could be solved by potentially spilling unsorted batches (and then sorting them separately). This woudl be less efficient (read/write some tuples twice but would work.
Problem 2 could use the multi-pass merge that @Kontinuation describes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, it works after change to 1 partition and increase the memory limit. |
// data types due to exponential growth when building the sort columns. We shrink the columns | ||
// to prevent memory reservation failures, as well as excessive memory allocation when running | ||
// merges in `SortPreservingMergeStream`. | ||
columns.iter_mut().for_each(|c| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it be expensive for all columns and each batch to do this, or can we filter those accurate columns which need to shrink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems you already benchmarked it, we may can the benchmark code also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shrink_to_fit
is basically a no-op for primitive type columns produced by take_arrays
because their internal buffer already has the right capacity, and it will only perform reallocations for columns with variable length data. so I don't there's a need for cherry-picking which columns to shrink.
I've also benchmarked sorting using utf8 columns and have not observed significant performance overhead:
merge sorted utf8 low cardinality
time: [3.8824 ms 3.8903 ms 3.8987 ms]
change: [-2.8027% -2.1227% -1.5573%] (p = 0.00 < 0.05)
Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
3 (3.00%) high mild
1 (1.00%) high severe
sort merge utf8 low cardinality
time: [4.2295 ms 4.2360 ms 4.2430 ms]
change: [+0.5975% +0.8722% +1.1242%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 15 outliers among 100 measurements (15.00%)
2 (2.00%) low mild
5 (5.00%) high mild
8 (8.00%) high severe
sort utf8 low cardinality
time: [6.4265 ms 6.4369 ms 6.4483 ms]
change: [-0.3276% -0.0658% +0.1908%] (p = 0.62 > 0.05)
No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
5 (5.00%) high mild
2 (2.00%) high severe
sort partitioned utf8 low cardinality
time: [343.87 µs 347.16 µs 351.07 µs]
change: [-1.1291% +0.3066% +1.8160%] (p = 0.68 > 0.05)
No change in performance detected.
Found 15 outliers among 100 measurements (15.00%)
5 (5.00%) high mild
10 (10.00%) high severe
merge sorted utf8 high cardinality
time: [5.9968 ms 6.0083 ms 6.0207 ms]
change: [-1.9398% -1.6215% -1.2928%] (p = 0.00 < 0.05)
Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
7 (7.00%) high mild
1 (1.00%) high severe
sort merge utf8 high cardinality
time: [6.4266 ms 6.4399 ms 6.4558 ms]
change: [-0.5594% -0.2292% +0.1020%] (p = 0.19 > 0.05)
No change in performance detected.
Found 6 outliers among 100 measurements (6.00%)
2 (2.00%) high mild
4 (4.00%) high severe
sort utf8 high cardinality
time: [7.7403 ms 7.7541 ms 7.7693 ms]
change: [-2.7779% -2.1541% -1.6176%] (p = 0.00 < 0.05)
Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
2 (2.00%) high mild
4 (4.00%) high severe
sort partitioned utf8 high cardinality
time: [364.21 µs 370.21 µs 376.41 µs]
change: [+2.2461% +4.2833% +6.3333%] (p = 0.00 < 0.05)
Performance has regressed.
sort, sort_tpch and tpch10 benchmarks also showed not much difference in performance.
Comparing main and fix-sort-mem-usage-reserve-mem-for-sort-merging
--------------------
Benchmark sort.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query ┃ main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Qsort utf8 │ 28514.66ms │ 28451.16ms │ no change │
│ Qsort int │ 29749.57ms │ 29879.78ms │ no change │
│ Qsort │ 28608.24ms │ 29085.31ms │ no change │
│ decimal │ │ │ │
│ Qsort │ 31013.98ms │ 31126.24ms │ no change │
│ integer │ │ │ │
│ tuple │ │ │ │
│ Qsort utf8 │ 28925.23ms │ 29281.38ms │ no change │
│ tuple │ │ │ │
│ Qsort mixed │ 30579.63ms │ 30550.25ms │ no change │
│ tuple │ │ │ │
└──────────────┴────────────┴─────────────────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ Total Time (main) │ 177391.31ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 178374.12ms │
│ Average Time (main) │ 29565.22ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 29729.02ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 6 │
└────────────────────────────────────────────────────────────────┴─────────────┘
--------------------
Benchmark sort_tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Query ┃ main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Q1 │ 187.27ms │ 187.75ms │ no change │
│ Q2 │ 154.92ms │ 157.82ms │ no change │
│ Q3 │ 885.18ms │ 893.74ms │ no change │
│ Q4 │ 184.50ms │ 189.54ms │ no change │
│ Q5 │ 315.13ms │ 322.19ms │ no change │
│ Q6 │ 335.00ms │ 338.65ms │ no change │
│ Q7 │ 584.88ms │ 594.44ms │ no change │
│ Q8 │ 452.66ms │ 460.51ms │ no change │
│ Q9 │ 472.15ms │ 475.38ms │ no change │
│ Q10 │ 681.58ms │ 685.07ms │ no change │
└──────────────┴──────────┴─────────────────────────────────────────────────┴───────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 4253.28ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 4305.10ms │
│ Average Time (main) │ 425.33ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 430.51ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 10 │
└────────────────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark sort_tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Q1 │ 2617.74ms │ 2652.65ms │ no change │
│ Q2 │ 2019.64ms │ 2034.08ms │ no change │
│ Q3 │ 10748.55ms │ 11028.78ms │ no change │
│ Q4 │ 2565.69ms │ 2581.39ms │ no change │
│ Q5 │ 3182.88ms │ 3226.93ms │ no change │
│ Q6 │ 3379.76ms │ 3432.35ms │ no change │
│ Q7 │ 7200.46ms │ 7245.30ms │ no change │
│ Q8 │ 4932.09ms │ 5133.81ms │ no change │
│ Q9 │ 5488.64ms │ 5473.89ms │ no change │
│ Q10 │ 18188.22ms │ 17129.05ms │ +1.06x faster │
└──────────────┴────────────┴─────────────────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 60323.67ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 59938.23ms │
│ Average Time (main) │ 6032.37ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 5993.82ms │
│ Queries Faster │ 1 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 9 │
└────────────────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ fix-sort-mem-usage-reserve-mem-for-sort-merging ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1 │ 1003.40ms │ 974.06ms │ no change │
│ QQuery 2 │ 142.90ms │ 142.05ms │ no change │
│ QQuery 3 │ 437.35ms │ 429.21ms │ no change │
│ QQuery 4 │ 218.20ms │ 219.31ms │ no change │
│ QQuery 5 │ 638.99ms │ 633.81ms │ no change │
│ QQuery 6 │ 152.49ms │ 151.94ms │ no change │
│ QQuery 7 │ 937.33ms │ 952.74ms │ no change │
│ QQuery 8 │ 690.88ms │ 675.75ms │ no change │
│ QQuery 9 │ 1055.28ms │ 1039.38ms │ no change │
│ QQuery 10 │ 621.41ms │ 632.68ms │ no change │
│ QQuery 11 │ 93.62ms │ 100.54ms │ 1.07x slower │
│ QQuery 12 │ 321.36ms │ 329.27ms │ no change │
│ QQuery 13 │ 442.88ms │ 434.09ms │ no change │
│ QQuery 14 │ 252.07ms │ 252.79ms │ no change │
│ QQuery 15 │ 419.63ms │ 414.17ms │ no change │
│ QQuery 16 │ 106.30ms │ 107.51ms │ no change │
│ QQuery 17 │ 1088.73ms │ 1083.62ms │ no change │
│ QQuery 18 │ 1795.68ms │ 1785.46ms │ no change │
│ QQuery 19 │ 462.31ms │ 458.10ms │ no change │
│ QQuery 20 │ 403.54ms │ 428.10ms │ 1.06x slower │
│ QQuery 21 │ 1453.76ms │ 1454.77ms │ no change │
│ QQuery 22 │ 158.43ms │ 151.23ms │ no change │
└──────────────┴───────────┴─────────────────────────────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 12896.55ms │
│ Total Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 12850.56ms │
│ Average Time (main) │ 586.21ms │
│ Average Time (fix-sort-mem-usage-reserve-mem-for-sort-merging) │ 584.12ms │
│ Queries Faster │ 0 │
│ Queries Slower │ 2 │
│ Queries with No Change │ 20 │
└────────────────────────────────────────────────────────────────┴────────────┘
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense.
// We'll gradually collect the sorted stream into self.in_mem_batches, or directly | ||
// write sorted batches to disk when the memory is insufficient. | ||
let mut spill_writer: Option<IPCWriter> = None; | ||
while let Some(batch) = sorted_stream.next().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point to avoid OOM for try_collect().
Thanks for the nice diagram, this explanation is super clear Regarding point 3, I thought of a edge case can cause
Edge case: let's say input is a deduplicated For point 4, are the memory budget to hold merged batches come from |
I agree that the current implementation uses a very rough estimation, and it could be way off from the actual memory consumption. A better approach is to sort and generate the row representation of the batch right after we ingesting it, then we would know the exact size of sorted batches and their row representations held in memory. The merge phase for handling spilling could simply take away these data and perform merging without reserving more memory. However, this conflicts between some of the optimizations we did in the past:
Yes. It may come from |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you so much @Kontinuation and @zhuqi-lucas - I think this PR is very nicely coded, and commented and is quite clear to read.
I think the only other thing left in my mind is to add a few more fuzz tests, but otherwise this is nice
@2010YOUY01 made a nice set of tests too to verify that the memory accounting was accurate, but they are currently disabled. Maybe we can also run them here too 🤔
cc @kazuyukitanimura
cc @westonpace as you filed #10073
self.sort_or_spill_in_mem_batches().await?; | ||
// We've already freed more than half of reserved memory, | ||
// so we can grow the reservation again. There's nothing we can do | ||
// if this try_grow fails. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can implement a more complicated multi-stage merging phase which merges a small portion of streams each time, and perform multiple rounds of merges to produce the final merged stream. However this approach is quite complex and out of the scope of this PR. I believe that there should be dedicated discussions around this problem.
I agree we should implement such a feature as a dedicated, follow on PR / Project.
@zhuqi-lucas or @Kontinuation is there one of you can file a ticket to track the work. I think especially highlighting what cases the current code won't work well is important.
It sounds like there are several issues:
- ensuring we can always spill data (now spilling will sometimes fail if we run out of memory to sort the batches in)
- Ensuring that we can always merge the data that was spilled, even if it had a really wide fanout (like 1000 of spill files)
I think problem 1 could be solved by potentially spilling unsorted batches (and then sorting them separately). This woudl be less efficient (read/write some tuples twice but would work.
Problem 2 could use the multi-pass merge that @Kontinuation describes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good high-level design to solve the 2X memory issue in spilling sort queries. I also checked the implementation, overall it looks good to me, I left some nit-picking suggestions.
Now I think the TODOs before merging are adding physical memory consumption validation, and file issues for known tasks.
Validate physical consumption
In theory, I think this PR can solve the 2X memory problem for memory-limited sort queries, but still can't solve 2X memory issue if running without memory limit and spilling enabled.
Examples:
- This PR can't solve
select * from t
consumes 2GB memory to fully materialize the output, in the best caseselect * from t order by c1
consumes around 2GB memory, given sorting is an O(1) space operation. The current situation is it consumes 4GB+ (more than 2 times). - This PR can solve
If you specify the memory limit to 1GB, the actual physical memory consumption would be more than 2GB. The ideal actual consumption is around 1GB.
Problem
I tried one query and this PR is not working as expected, I specified one query to run under 5GB memory (select *
without order requires 7GB) but it's still consuming around 10GB, could you double check? I suspect we missed some details.
I made a mistake, there is no problem: the result to check is measured RSS < memory limit + result size
Reproducer
Under datafusion-cli
, run
/usr/bin/time -l cargo run --release -- --mem-pool-type fair -m 5G -f '/Users/yongting/Code/scripts/external_sort.sql'
SQL script (have to modify tpch_sf10 path), I did not include string columns because I think there are some known issues.
CREATE EXTERNAL TABLE IF NOT EXISTS lineitem (
l_orderkey BIGINT,
l_partkey BIGINT,
l_suppkey BIGINT,
l_linenumber INTEGER,
l_quantity DECIMAL(15, 2),
l_extendedprice DECIMAL(15, 2),
l_discount DECIMAL(15, 2),
l_tax DECIMAL(15, 2),
l_returnflag VARCHAR,
l_linestatus VARCHAR,
l_shipdate DATE,
l_commitdate DATE,
l_receiptdate DATE,
l_shipinstruct VARCHAR,
l_shipmode VARCHAR,
l_comment VARCHAR
) STORED AS parquet
LOCATION '/Users/yongting/Code/datafusion/benchmarks/data/tpch_sf10/lineitem';
-- selected all non-varchar columns
SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_shipdate, l_commitdate, l_receiptdate FROM lineitem ORDER BY l_orderkey;
Result is around 10GB:
10703093760 maximum resident set size
Once fixed, I think the following memory bounds can be set smaller (e.g. *3
-> *1.5
), for regression test.
datafusion/datafusion/core/tests/memory_limit/memory_limit_validation/sort_mem_validation.rs
Line 175 in 2238680
80_000_000 * 3, |
Known issues
I think this PR already made improvements on the current status, so those issues only need to be tracked.
Update: filed #14748 and apache/arrow-rs#7151
- After merging, memory consumption of
StringViewArray
will increase, see reproducer in External sorting not working for (maybe only for string columns??) #12136 (comment). This PR uses a workaround to eagerly flush merged batches, instead of directly fixing it. - This PR uses 2x to estimate memory consumption for sorted batches and
Rows
generated in intermediate steps to accelerate comparisions, it works for common cases but still can fail. See bug: Fix memory reservation and allocation problems for SortExec #14644 (comment) and bug: Fix memory reservation and allocation problems for SortExec #14644 (comment). The solution can be when reading a new batch, first docol->row
conversion and measure the memory consumption, though this require some significant change to the current code structure.
self.reservation.free(); | ||
self.spills.push(spill_file); | ||
} else { | ||
self.in_mem_batches.push(batch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding another field in_mem_batches_sorted
for this purpose can slightly improve readability
// Release the memory reserved for merge back to the pool so | ||
// there is some left when `in_memo_sort_stream` requests an | ||
// there is some left when `in_mem_sort_stream` requests an | ||
// allocation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// allocation. | |
// allocation. At the end of this function, memory will be reserved again for the next spill. |
I found this pre-reserved memory confusing when reading this code for the first time, so I would like to make it more clear.
@@ -612,6 +659,20 @@ impl ExternalSorter { | |||
} | |||
} | |||
|
|||
/// Estimate how much memory is needed to sort a `RecordBatch`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to explicitly point out the extra memory is reserved for potential Row
conversion of the original array, which is used to speed up comparison in sorting and merging.
I think this is by design. The physical operators should not reserve memory for the batches it produces. It is the parent operator's duty to reserve memory for the batches fetched from children operators if it needs to hold the batches in memory for a long time.
It is a problem of datafusion-cli. If datafusion-cli decides to hold all the result batches in memory, it should create a memory consumer for itself and reserve memory for the result batches. datafusion-cli calls collect to hold the entire result set in memory before displaying it, this is unnecessary when maxrows is not unlimited. I've tried the following code to replace the collect call, and the maximum resident set size has reduced to 4.8 GB: // Bounded stream; collected results are printed after all input consumed.
let schema = physical_plan.schema();
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
while let Some(batch) = stream.next().await {
let batch = batch?;
results.push(batch);
if let MaxRows::Limited(max_rows) = print_options.maxrows {
if results.len() >= max_rows {
break;
}
}
}
adjusted.into_inner().print_batches(schema, &results, now)?; |
Sorry, I mistakenly edited your original reply @Kontinuation, I'm trying to revert it back. |
Yes, you are right and the result is good. I made a mistake about the expected behavior: it should be |
Good finding, may be we can file a issue to improve the datafusion-cli memory usage and reservation. |
2e87dbb
to
babe5cd
Compare
Thanks for the ping and thanks for working on this! This is an important feature for us (for training secondary indices on string columns) so I'm very thankful to see the effort 😄 I tried the reproducer from #10073 on this branch (babe5cd) and wasn't able to get it to pass and so I agree it doesn't seem to address all issues. However, what you're describing does seem to address the problems that I was seeing and so I think it probably is making good progress. I also tried setting |
Hi @westonpace , i think the problem is we need to setting the partition count and also to increase the memory limit also for your case:
I can share the code to pass for your case, also clean the build parquet file and retry: use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use futures::TryStreamExt;
use std::sync::Arc;
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
pub async fn main() {
build_parquet();
let env = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::default())
.with_memory_pool(Arc::new(FairSpillPool::new(300 * 1024 * 1024)))
.build_arc()
.unwrap();
let mut config = SessionConfig::new().with_sort_spill_reservation_bytes(32 * 1024 * 1024).with_target_partitions(1);
config.options_mut().execution.parquet.schema_force_view_types = false;
let ctx = SessionContext::new_with_config_rt(config, env);
ctx.register_parquet(
"big_strings",
"/tmp/big_strings.parquet",
ParquetReadOptions::default(),
)
.await
.unwrap();
let sql = "SELECT * FROM big_strings ORDER BY strings";
println!("Sorting strings");
ctx.sql(sql)
.await
.unwrap()
.execute_stream()
.await
.unwrap()
.try_for_each(|_| std::future::ready(Ok(())))
.await
.unwrap();
}
fn build_parquet() {
if std::fs::File::open("/tmp/big_strings.parquet").is_ok() {
println!("Using existing file at /tmp/big_strings.parquet");
return;
}
println!("Generating test file at /tmp/big_strings.parquet");
let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"strings",
DataType::Utf8,
false,
)]));
let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();
for batch_idx in 0..100 {
println!("Generating batch {} of 100", batch_idx);
let mut string_array_builder =
StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14);
for i in 0..(1024 * 1024) {
string_array_builder
.append_value(format!("string-{}string-{}string-{}", i, i, i));
}
let array = Arc::new(string_array_builder.finish());
let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();
} Details reason here:
And for future work we can have a complete spill solution tracking here: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again for the nice work.
This is a great observation and I think holds true in practice However I don't think it is reflected in the documention here Would someone be willing to update those docs? |
This is also another great observation that it would be great to get captured in a ticket |
Thanks everyone -- this is a great example of collaboration and what the collective efforts of many contributors can do. I love it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️ 🦾
Thank you for this. You are correct, with this addition then my test passes. #14692 sounds awesome but I think we can also go far with these current fixes in the meantime 🚀 |
Created a follow-up for this improvement: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @Kontinuation @zhuqi-lucas @2010YOUY01 @alamb and everyone, awesome team effort like Andrew mentioned. I think this ticket tests and experience would be beneficial for #14510
Which issue does this PR close?
Rationale for this change
I had a hard time making DataFusion Comet work on cloud instances with 4GB memory per CPU core, partially because DataFusion is very likely to allocates more memory than reserved and run into OOM, or run into various kinds of memory reservation failures. In cases when the partition to process is larger than available memory, we expect spilling to happen to run the query to completion, but got tons of failures instead. We found operators involving SortExec such as sort-merge join triggers the aforementioned problems frequently.
#10073 reports that SortExec may allocate 2X memory than it reserves (see "the second problem" in the issue), and we found that it contributed to most of the OOM cases we encountered when using Comet. We have also found several other problems related to SortExec that are critical for our memory-limited use cases, and this PR tries to accommodate some of them.
What changes are included in this PR?
This PR contains several fixes:
try_collect
the result of merging all at once. We consume the merged batches one after another and reserve memory for each batch. Once the reservation fails we switch to "spill mode" and write all future batches into the spill file. This resolves the 2X memory allocation problem ("the second problem") reported by Memory account not adding up in SortExec #10073, as well as this comment: External sorting not working for (maybe only for string columns??) #12136 (comment)shrink_to_fit
every sorted batches reduce the memory footprint of sorted batches, otherwise sorted string arrays may take 2X the original space in the worst case, due to exponential growth ofMutableBuffer
for storing variable length binary values.shrink_to_fit
is a no-op for primitive-type columns returned bytake_arrays
since they already have the right capacity, and benchmarking showed no significant performance regression for non-primitive types such as string arrays, so I think it is a good change. This resolves "the first problem" reported by Memory account not adding up in SortExec #10073.sort_spill_reservation_bytes
when memory reserved is not big enough for merging. I don't think it is a good change but this is the only solution I can think of to compensate for the extra memory usage for the row representation of sorted columns.The problems with SortExec are not easy to solve without introducing significant changes to the overall design, 3) of this PR is mostly a bandaid solutions. I believe that the implementation needs to be revamped to make all the memory reservation/spilling behave correctly.
Are these changes tested?
Yes. It passes all the tests.
Are there any user-facing changes?
Uses may find that sort operator is more likely to spill when running with memory constraints. The old configurations they had to make sort operator work may not be optimal after applying this PR. For instance, user may configure a super large
sort_spill_reservation_bytes
to make merging work, but this PR reduces the optimal value ofsort_spill_reservation_bytes
for the same workload.